package com.jscloud.bigdata.source;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.ParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

public class MySqlSource extends AbstractSource implements Configurable, PollableSource {

        //打印日志
        private static final Logger LOG = LoggerFactory.getLogger(MySqlSource.class);
        //自定义工具类QueryMysql的对象
        private QueryMysql sqlSourceHelper;

        @Override
        public long getBackOffSleepIncrement() {
                return 0;
        }

        @Override
        public long getMaxBackOffSleepInterval() {
                return 0;
        }

        @Override
        public void configure(Context context) {
                //初始化
                try {
                        sqlSourceHelper = new QueryMysql(context);
                } catch (ParseException e) {
                        e.printStackTrace();
                }
        }

        /**
         * 接受mysql表中的数据
         *
         * @return
         * @throws EventDeliveryException
         */
        @Override
        public PollableSource.Status process() throws EventDeliveryException {
                try {
                        //查询数据表
                        List<List<Object>> result = sqlSourceHelper.executeQuery();
                        //存放event的集合
                        List<Event> events = new ArrayList<>();
                        //存放event头集合
                        HashMap<String, String> header = new HashMap<>();
                        //如果有返回数据，则将数据封装为event
                        if (!result.isEmpty()) {
                                List<String> allRows = sqlSourceHelper.getAllRows(result);
                                Event event = null;
                                for (String row : allRows) {
                                        event = new SimpleEvent();
                                        event.setBody(row.getBytes());
                                        event.setHeaders(header);
                                        events.add(event);
                                }
                                //将event写入channel
                                this.getChannelProcessor().processEventBatch(events);
                                //更新数据表中的offset信息
                                sqlSourceHelper.updateOffset2DB(result.size());
                        }
                        //等待时长
                        Thread.sleep(sqlSourceHelper.getRunQueryDelay());
                        return Status.READY;
                } catch (InterruptedException e) {
                        LOG.error("Error procesing row", e);
                        return Status.BACKOFF;
                }
        }

        @Override
        public synchronized void stop() {
                LOG.info("Stopping sql source {} ...", getName());
                try {
                        //关闭资源
                        sqlSourceHelper.close();
                } finally {
                        super.stop();
                }
        }
}
